pythonでmultiprocess/threadとjoblib¶
multiprocess/thread¶
- interfaceが似ている
- threadは見たことがない
import multiprocessing as mp
import threading as th
import datetime
n_loop = 10000 ** 2
n_x = 8
def loop():
for _ in range(n_loop):
pass
def run(klass):
xs = []
for _ in range(n_x):
x = klass(target=loop)
x.start()
xs.append(x)
for x in xs:
x.join()
def main():
print("start", datetime.datetime.now())
print("-"*10)
run(mp.Process)
print("run end(mp)", datetime.datetime.now())
print("-"*10)
run(th.Thread)
print("run end(th)", datetime.datetime.now())
print("-"*10)
for _ in range(n_x):
loop()
print("loop end", datetime.datetime.now())
if __name__ == "__main__":
main()
arguments¶
引数はiterator, keyword引数はdict
multiprocess_using_arguments.py
def print_args(arg1, arg2, kwarg1=3, kwarg2=4, **kwargs):
print(arg1, arg2)
print(kwarg1, kwarg2)
print(kwargs)
return arg1 + arg2
x = mp.Process(target=print_args, args=(1, 2),
kwargs={"kwarg1":5, "kwargs100":10})
x.start()
1 2
5 4
{'kwargs100': 100}
Pool¶
- 結果を返してくれる
- lambdaがダメだった
- mapは単一引数
- startmapは複数可能
- applyはkeywordを渡せる
- async版がある
- context抜ける前に処理しないとダメ
def add(x, y): return x + y
with mp.Pool() as p:
results = p.starmap(add, [(1, 2), (3, 4)])
result = p.apply(print_args, args=(5, 6), kwds={"kwarg1":5, "kwarg100":100})
async_result = p.apply_async(print_args, args=(7, 8), kwds={"kwarg1":5, "kwarg100":100})
print(async_result.get(timeout=2))
# => 15
print(results)
# => [3, 7]
print(result)
# => 11
other¶
- Queue/Pipe
- message passing
- Lock
- Semaphore
もっと簡単に使いたい場合や非同期処理は下記でよさそう
- concurrent.futures
- async/await
joblib¶
- sklearnのexternalsとして利用されている
- Queueはないので置き換えらなさそう
- multiprocessingより良い点がある
- https://pythonhosted.org/joblib/parallel.html#bad-interaction-of-multiprocessing-and-third-party-libraries
- 雑に測定してみた
import joblib
import time
%timeit joblib.Parallel(n_jobs=1)(joblib.delayed(time.sleep)(0.1) for i in range(100))
1 loop, best of 3: 10.3 s per loop
%timeit joblib.Parallel(n_jobs=-1)(joblib.delayed(time.sleep)(0.1) for i in range(100))
1 loop, best of 3: 2.7 s per loop
%timeit [time.sleep(0.1) for i in range(100)]
1 loop, best of 3: 10.2 s per loop
joblib.Parallel(n_jobs=-1)(
joblib.delayed(print_args)(i, 2, kwarg1=5, kwarg100=100)
for i in range(3))